Expand description
§futures-buffered
This project provides a single future structure: FuturesUnorderedBounded
.
Much like futures::stream::FuturesUnordered
,
this is a thread-safe, Pin
friendly, lifetime friendly, concurrent processing stream.
The is different to FuturesUnordered
in that FuturesUnorderedBounded
has a fixed capacity for processing count.
This means it’s less flexible, but produces better memory efficiency.
§Benchmarks
§Speed
Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
FuturesUnordered time: [420.47 ms 422.21 ms 423.99 ms]
FuturesUnorderedBounded time: [366.02 ms 367.54 ms 369.05 ms]
§Memory usage
Running 512000 Ready<i32>
futures with 256 concurrent jobs.
- count: the number of times alloc/dealloc was called
- alloc: the number of cumulative bytes allocated
- dealloc: the number of cumulative bytes deallocated
FuturesUnordered
count: 1024002
alloc: 40960144 B
dealloc: 40960000 B
FuturesUnorderedBounded
count: 2
alloc: 8264 B
dealloc: 0 B
§Conclusion
As you can see, FuturesUnorderedBounded
massively reduces you memory overhead while providing a significant performance gain.
Perfect for if you want a fixed batch size
§Example
use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnorderedBounded;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;
// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);
/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
let req = Request::builder()
.header("Host", "example.com")
.method("GET")
.body(String::new())
.unwrap();
rs.send_request(req)
}
// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);
// start up 128 requests
for _ in 0..128 {
queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
queue.next().await;
queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
queue.next().await;
}
Structs§
- Stream for the
buffered_unordered
method. - Stream for the
buffered_ordered
method. - An unbounded queue of futures.
- An unbounded queue of futures.
- A set of futures which may complete in any order.
- A set of futures which may complete in any order.
- Future for the
join_all
function. - A combined stream that releases values in any order that they come
- A combined stream that releases values in any order that they come.
- Stream for the
try_buffered_unordered
method. - Stream for the
try_buffered_ordered
method. - Future for the
try_join_all
function.
Traits§
- An extension trait for
Stream
s that provides a variety of convenient combinator functions. - An extension trait for
Stream
s that provides a variety of convenient combinator functions. - A convenience for futures that return
Result
values that includes a variety of adapters tailored to such futures. - A convenience for streams that return
Result
values that includes a variety of adapters tailored to such futures.
Functions§
- Creates a future which represents a collection of the outputs of the futures given.
- Creates a future which represents a collection of the outputs of the futures given.
Type Aliases§
- Merge
Deprecated